#!/usr/bin/env python3
# Copyright (C) 2023 Checkmk GmbH - License: GNU General Public License v2
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.

import logging
import pprint
import re
import subprocess
import time
from collections.abc import Iterator
from pathlib import Path

import pytest

from cmk.ec.config import (  # pylint: disable=cmk-module-layer-violation
    ECRulePackSpec,
    EventLimit,
    Rule,
    ServiceLevel,
    State,
)
from cmk.gui.watolib.site_changes import ChangeSpec  # pylint: disable=cmk-module-layer-violation
from tests.testlib.site import Site

logger = logging.getLogger(__name__)


def _validate_process_return_code(process: subprocess.Popen, assert_msg: str) -> None:
    """Validate return code is 0.

    Raises `AssertionError`, if not. `AssertionError` is raised with
    + custom message
    + stdout generated by the process
    + stderr generated by the process.

    NOTE: `process` should use subprocess.PIPE for stdout and stderr.
    """
    if process.wait() != 0:
        p_out, p_err = process.communicate()
        msg = (assert_msg, f"STDOUT: {p_out}", f"STDERR: {p_err}")
        raise AssertionError("\n".join(msg))


def _execute_cmd_and_validate_return_code(
    site: Site, cmd: list[str], assert_msg: str
) -> tuple[str, str]:
    try:
        response = site.run(cmd)
    except subprocess.CalledProcessError as excp:
        excp.add_note(assert_msg)
        raise excp
    return response.stdout, response.stderr


def _get_ec_rule_packs(
    rule_id: str, title: str, state: State, match: str, limit: int = 0
) -> list[ECRulePackSpec]:
    """EC rule to inject in the test-site"""
    rule = Rule(
        id=rule_id,
        description="",
        comment="",
        docu_url="",
        disabled=False,
        state=state,
        sl=ServiceLevel(value=0, precedence="message"),
        actions=[],
        actions_in_downtime=True,
        cancel_actions=[],
        cancel_action_phases="always",
        autodelete=False,
        event_limit=EventLimit(action="None", limit=limit),
        match=match,
        invert_matching=False,
    )
    return [
        ECRulePackSpec(
            id=rule_id,
            title=title,
            disabled=False,
            rules=[rule],
        )
    ]


def _get_replication_change() -> ChangeSpec:
    """Replication change to inject in the test-site"""
    return ChangeSpec(
        id="",
        action_name="edit-rule-pack",
        text="Modified rule pack test",
        object=None,
        user_id="cmkadmin",
        domains=["ec"],
        time=0,
        need_sync=True,
        need_restart=True,
        domain_settings={},
        prevent_discard_changes=False,
        diff_text=None,
        has_been_activated=False,
    )


def _write_ec_rule(site: Site, rule: list | None) -> None:
    ec_rules_path = site.path("etc/check_mk/mkeventd.d/wato/rules.mk")
    site.write_file(str(ec_rules_path), f"rule_packs += {rule}" if rule else "")


def _activate_ec_changes(site: Site) -> None:
    replication_changes_path = site.path(f"var/check_mk/wato/replication_changes_{site.id}.mk")
    site.write_file(str(replication_changes_path), str(_get_replication_change()))
    site.openapi.changes.activate_and_wait_for_completion(force_foreign_changes=True)


def _generate_message_via_events_pipe(site: Site, message: str, end_of_line: bool = True) -> None:
    """Generate EC message via Unix socket"""
    events_path = site.path("tmp/run/mkeventd/events")
    cmd = f"sudo su -l {site.id} -c 'echo {'' if end_of_line else '-n'} {message} > {events_path}'"
    logger.info("Executing: %s", cmd)
    with subprocess.Popen(
        cmd, encoding="utf-8", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    ) as process:
        _validate_process_return_code(process, "Failed to generate EC message via Unix socket.")


def _generate_message_via_syslog(
    site: Site,
    message: str,
    udp: bool = True,
    end_of_line: bool = True,
    timeout: int = 5,
) -> None:
    """Generate EC message via syslog"""
    cmd = (
        f"sudo su -l {site.id} -c 'echo {'' if end_of_line else '-n'} {message} | nc -w {timeout} "
        f"{'-u' if udp else ''} 127.0.0.1 514'"
    )
    logger.info("Executing: %s", cmd)
    with subprocess.Popen(
        cmd, encoding="utf-8", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    ) as process:
        _validate_process_return_code(process, "Failed to generate EC message via Syslog.")


def _wait_for_queried_column(
    site: Site,
    query: str,
    sleep_time: float = 2,
    max_count: int = 20,
    strict: bool = True,
) -> list[str]:
    count = 0
    while not (queried_column := site.live.query_column(query)) and count < max_count:
        logger.info("Waiting for the following livestatus query: %s", repr(query))
        time.sleep(sleep_time)
        count += 1
    if strict:
        assert queried_column, f"Failed to retrieve livestatus query: {repr(query)}"
    return queried_column


def _wait_for_event_message_in_log(
    site: Site, message: str, log_path: Path, sleep_time: float = 2, max_count: int = 20
) -> None:
    count = 0
    while (message not in site.read_file(log_path)) and count < max_count:
        logger.info(
            f"Waiting for the following message in log file '{log_path}': %s", repr(message)
        )
        time.sleep(sleep_time)
        count += 1

    assert message in site.read_file(log_path), (
        f"Failed to retrieve event message '{message}' in log file '{log_path}'"
    )


def _get_snmp_trap_cmd(event_message: str) -> list:
    return [
        "snmptrap",
        "-v",
        "1",
        "-c",
        "public",
        "127.0.0.1",
        ".1.3.6.1",
        "192.168.178.30",
        "6",
        "17",
        '""',
        ".1.3.6.1",
        "s",
        f'"{event_message}"',
    ]


@pytest.fixture(name="setup_ec", scope="function")
def _setup_ec(site: Site) -> Iterator[tuple[str, str, State]]:
    match = "dummy"
    rule_id = f"test {match}"
    rule_state: State = 1

    _write_ec_rule(
        site, _get_ec_rule_packs(title="", rule_id=rule_id, state=rule_state, match=match)
    )

    # in order for the EC rule to take effect, we need to inject a change in the EC domain and
    # perform a changes' activation
    _activate_ec_changes(site)

    yield match, rule_id, rule_state

    # cleanup: remove EC rules and activate changes
    _write_ec_rule(site, rule=None)
    _activate_ec_changes(site)

    # cleanup: archive generated events
    for event_id in _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_id\n", max_count=3, strict=False
    ):
        resp = site.openapi.post(
            "domain-types/event_console/actions/delete/invoke",
            headers={"Content-Type": "application/json"},
            json={
                "site_id": site.id,
                "filter_type": "by_id",
                "event_id": event_id,
            },
        )

        assert resp.status_code == 204, pprint.pformat(resp.json())


@pytest.fixture(name="restart_site", scope="module")
def _restart_site(site: Site) -> Iterator[None]:
    site.stop()
    yield
    site.start()


@pytest.fixture(name="enable_receivers", scope="module")
def _enable_receivers(site: Site, restart_site: None) -> Iterator[None]:
    initial_config: dict = {}
    for receiver in ["MKEVENTD_SNMPTRAP", "MKEVENTD_SYSLOG", "MKEVENTD_SYSLOG_TCP"]:
        p_out, _ = _execute_cmd_and_validate_return_code(
            site,
            ["omd", "config", "show", receiver],
            f"Failed to retrieve {receiver} receiver status.",
        )
        initial_config[receiver] = p_out.strip()

        logger.info("Setting %s receiver to 'on'...", receiver)
        _ = _execute_cmd_and_validate_return_code(
            site,
            ["omd", "config", "set", receiver, "on"],
            f"Failed to change {receiver} receiver.",
        )

    site.start()

    yield

    site.stop()
    for receiver in ["MKEVENTD_SNMPTRAP", "MKEVENTD_SYSLOG", "MKEVENTD_SYSLOG_TCP"]:
        logger.info("Setting %s receiver to '%s'...", receiver, initial_config[receiver])
        _ = _execute_cmd_and_validate_return_code(
            site,
            ["omd", "config", "set", receiver, initial_config[receiver]],
            f"Failed to change {receiver} receiver.",
        )


@pytest.fixture(name="enable_snmp_trap_translation", scope="function")
def _enable_snmp_trap_translation(site: Site) -> Iterator[None]:
    logger.info("Enabling SNMP trap translation...")
    ec_global_rules_path = site.path("etc/check_mk/mkeventd.d/wato/9999-test_ec.mk")
    site.write_file(str(ec_global_rules_path), "translate_snmptraps = (True, {})")
    _activate_ec_changes(site)

    yield

    logger.info("Disabling SNMP trap translation...")
    site.delete_file(str(ec_global_rules_path))
    _activate_ec_changes(site)


@pytest.mark.skip_if_edition("saas")  # reason="EC is disabled in the SaaS edition"
def test_ec_rule_match_events_pipe(site: Site, setup_ec: Iterator) -> None:
    """Generate a message via the events pipe matching an EC rule and assert an event is created"""
    match, rule_id, rule_state = setup_ec

    event_message = f"some {match} status"
    _generate_message_via_events_pipe(site, event_message)

    # retrieve id of matching rule via livestatus query
    queried_rule_ids = _wait_for_queried_column(site, "GET eventconsolerules\nColumns: rule_id\n")
    assert rule_id in queried_rule_ids

    # retrieve matching event state via livestatus query
    queried_event_states = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_state\n"
    )
    assert len(queried_event_states) == 1
    assert queried_event_states[0] == rule_state

    # retrieve matching event message via livestatus query
    queried_event_messages = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_text"
    )
    assert len(queried_event_messages) == 1
    assert queried_event_messages[0] == event_message


@pytest.mark.skip_if_edition("saas")  # reason="EC is disabled in the SaaS edition"
def test_ec_rule_no_match_events_pipe(site: Site, setup_ec: Iterator) -> None:
    """Generate a message via the events pipe not matching any EC rule.

    Assert no event is created."""
    match, _, _ = setup_ec
    event_message = "some other status"
    assert match not in event_message

    _generate_message_via_events_pipe(site, event_message)

    queried_event_states = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_state\n", max_count=3, strict=False
    )
    assert not queried_event_states

    queried_event_messages = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_text", max_count=3, strict=False
    )
    assert not queried_event_messages


@pytest.mark.skip_if_edition("saas")  # reason="EC is disabled in the SaaS edition"
def test_ec_rule_match_snmp_trap(site: Site, setup_ec: Iterator, enable_receivers: None) -> None:
    """Generate a message via SNMP trap matching an EC rule and assert an event is created"""
    match, rule_id, rule_state = setup_ec
    event_message = f"some {match} status"

    _ = _execute_cmd_and_validate_return_code(
        site, _get_snmp_trap_cmd(event_message), "Failed to send message via SNMP trap."
    )

    _wait_for_event_message_in_log(site, event_message, Path("var/log/mkeventd.log"))

    # retrieve id of matching rule via livestatus query
    queried_rule_ids = _wait_for_queried_column(site, "GET eventconsolerules\nColumns: rule_id\n")
    assert rule_id in queried_rule_ids

    # retrieve matching event state via livestatus query
    queried_event_states = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_state\n"
    )
    assert len(queried_event_states) == 1
    assert queried_event_states[0] == rule_state

    # retrieve matching event message via livestatus query
    queried_event_messages = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_text"
    )
    assert len(queried_event_messages) == 1
    assert event_message in queried_event_messages[0]


@pytest.mark.skip_if_edition("saas")  # reason="EC is disabled in the SaaS edition"
def test_ec_rule_no_match_snmp_trap(site: Site, setup_ec: Iterator, enable_receivers: None) -> None:
    """Generate a message via SNMP trap not matching any EC rule and assert no event is created"""
    match, _, _ = setup_ec
    event_message = "some other status"
    assert match not in event_message

    _ = _execute_cmd_and_validate_return_code(
        site, _get_snmp_trap_cmd(event_message), "Failed to send message via SNMP trap."
    )

    queried_event_states = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_state\n", max_count=3, strict=False
    )
    assert not queried_event_states

    queried_event_messages = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_text", max_count=3, strict=False
    )
    assert not queried_event_messages


@pytest.mark.skip_if_edition("saas")  # reason="EC is disabled in the SaaS edition"
def test_ec_global_settings(
    site: Site, setup_ec: Iterator, enable_receivers: None, enable_snmp_trap_translation: None
) -> None:
    """Assert that global settings of the EC are applied to the EC

    * Activate SNMP traps translation via EC global rules
    * Send message via SNMP trap
    * Assert SNMP-MIB is found in the event message
    """
    match, _, _ = setup_ec
    event_message = f"some {match} status"

    _ = _execute_cmd_and_validate_return_code(
        site, _get_snmp_trap_cmd(event_message), "Failed to send message via SNMP trap."
    )

    queried_event_messages = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_text"
    )
    assert len(queried_event_messages) == 1

    pattern = "SNMP.*MIB"  # pattern expected after SNMP traps translation
    assert re.compile(pattern).search(queried_event_messages[0]), (
        f"{pattern} not found in the event message:\n {queried_event_messages[0]}"
    )


@pytest.mark.skip_if_edition("saas")  # reason="EC is disabled in the SaaS edition"
@pytest.mark.parametrize("udp_enabled", [True, False], ids=["udp", "tcp"])
def test_ec_rule_match_syslog(
    site: Site,
    setup_ec: Iterator,
    enable_receivers: None,
    udp_enabled: bool,
) -> None:
    """Generate a message via Syslog matching an EC rule and assert an event is created"""
    match, rule_id, rule_state = setup_ec
    event_message = f"some {match} status"
    _generate_message_via_syslog(site, event_message, udp=udp_enabled)

    # retrieve id of matching rule via livestatus query
    queried_rule_ids = _wait_for_queried_column(site, "GET eventconsolerules\nColumns: rule_id\n")
    assert rule_id in queried_rule_ids

    # retrieve matching event state via livestatus query
    queried_event_states = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_state\n"
    )
    assert len(queried_event_states) == 1
    assert queried_event_states[0] == rule_state

    # retrieve matching event message via livestatus query
    queried_event_messages = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_text"
    )
    assert len(queried_event_messages) == 1
    assert queried_event_messages[0] == event_message


@pytest.mark.skip_if_edition("saas")  # reason="EC is disabled in the SaaS edition"
def test_ec_rule_no_eol(site: Site, setup_ec: Iterator, enable_receivers: None) -> None:
    """Generate a message via events pipe and Syslog with no end-of-line matching an EC rule.

    Assert:
        * an event is NOT created via events pipe
        * an event is NOT created via Syslog TCP
        * an event is created via Syslog UDP
    """
    match, _, _ = setup_ec

    event_message = f"some {match} status"
    _generate_message_via_events_pipe(site, event_message, end_of_line=False)
    _generate_message_via_syslog(site, event_message, udp=False, end_of_line=False)

    queried_event_states = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_state\n", max_count=3, strict=False
    )
    assert not queried_event_states

    queried_event_messages = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_text", max_count=3, strict=False
    )
    assert not queried_event_messages

    _generate_message_via_syslog(site, event_message, udp=True, end_of_line=False)

    queried_event_messages = _wait_for_queried_column(
        site, "GET eventconsoleevents\nColumns: event_text"
    )
    assert len(queried_event_messages) == 1
    assert queried_event_messages[0] == event_message
